import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class GroupByAggregation {
    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4),
                new Order(3L, "rubber", 6)
        ));


//        tEnv.registerDataStream("Orders",orderA);
        tEnv.createTemporaryView("Orders",orderA);
        Table orders = tEnv.from("Orders");
        Table result = orders.groupBy($("user"),$("product"))
//                .select($("user"),$("product"),$("amount").sum().as("cnt"));
                .select($("user"),$("product"),$("amount").sum().as("cnt"));

//        select之后得到的结果是:
//        4> (true,1,beer)
//        7> (true,3,rubber)
//        2> (true,1,diaper)





//        Table orders = tableEnv.from("Orders");
//        Table result = orders.groupBy($("a")).select($("a"), $("b").sum().as("d"));



//        tEnv.toAppendStream(result, Row.class).print();
        tEnv.toRetractStream(result, Row.class).print();
    env.execute();

//最终结果是:
//        7> (true,3,rubber,2)
//        4> (true,1,beer,3)
//        2> (true,1,diaper,4)
//        7> (false,3,rubber,2)
//        7> (true,3,rubber,8)
//
//来自微信网友解释true/false
//        就是append流与retract streame的区别，append单纯追加，retract做的是删除并追加
//        Sum操作如果一个key的值更新了，之前的数据被标记为false，新数据被标记为true

//        上述写法属于Flink动态表的ChangeLog格式，true代表新增，false代表删除。




    }


}
